package com.hao.chapter11;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class WindowAggregation {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //1.在创建表的DDL中直接定义时间属性
        String creatDDL = "CREATE TABLE clickTable (" +
                "user_name STRING," +
                "url STRING," +
                "ts BIGINT," +
                "et AS TO_TIMESTAMP( FROM_UNIXTIME(ts / 1000))," + //事件时间  FROM_UNIXTIME() 能转换为年月日时分秒这样的格式 转换秒
                " WATERMARK FOR et AS et - INTERVAL '1' SECOND " + //watermark 延迟一秒
                ")WITH(" +
                " 'connector' = 'filesystem'," +
                " 'path' = 'input/clicks.txt'," +
                " 'format' = 'csv'" +
                ")";

        tableEnv.executeSql(creatDDL);

        //新版本(滚动窗口)
        Table tumbleWindow = tableEnv.sqlQuery("select user_name,count(1) as cnt," +
                " window_end as endT " +
                " from TABLE(" +
                " TUMBLE(TABLE clickTable,DESCRIPTOR(et),INTERVAL '5' SECOND)" +
                ") " +
                "group by user_name,window_end,window_start");


        //滑动窗口
        Table hopWindow = tableEnv.sqlQuery("select user_name,count(1) as cnt," +
                " window_end as endT " +
                " from TABLE(" +
                " HOP(TABLE clickTable,DESCRIPTOR(et),INTERVAL '5' SECOND,INTERVAL '10' SECOND)" +
                ") " +
                "group by user_name,window_end,window_start");

        //累计窗口
        Table cumulateWindow = tableEnv.sqlQuery("select user_name,count(1) as cnt," +
                " window_end as endT " +
                " from TABLE(" +
                " CUMULATE(TABLE clickTable,DESCRIPTOR(et),INTERVAL '5' SECOND,INTERVAL '10' SECOND)" +
                ") " +
                "group by user_name,window_end,window_start");

        //执行
        tableEnv.toChangelogStream(tumbleWindow).print("tumble window:");
        tableEnv.toChangelogStream(hopWindow).print("hop window:");
        tableEnv.toChangelogStream(cumulateWindow).print("cumulate Window:");

        env.execute();
    }
}
